package pb.wang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

/**
  * Created by admin on 2016/3/29.
  */
object WindowJoinTester {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group>")
      System.exit(1)
    }
    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group) = args
    val sparkConf = new SparkConf().setAppName("KafkaJoiner").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    //ssc.checkpoint("checkpoint")

    val topicAMap = Map("topic_a"->1)
    val leftRdd = KafkaUtils.createStream(ssc, zkQuorum, group, topicAMap).window(Seconds(100), Seconds(10))

    val topicBMap = Map("topic_b"->1)
    val rightRdd = KafkaUtils.createStream(ssc, zkQuorum, group, topicBMap).window(Seconds(10), Seconds(10))

    val joinRdd = leftRdd.join(rightRdd)
    joinRdd.print(100)
    ssc.start()
    ssc.awaitTermination()
  }
}